1   /**
2    * Copyright 2014 Netflix, Inc.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    * http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package rx.internal.operators;
17  
18  import java.util.Queue;
19  import java.util.concurrent.ConcurrentLinkedQueue;
20  import java.util.concurrent.atomic.*;
21  
22  import rx.*;
23  import rx.Observable.Operator;
24  import rx.exceptions.*;
25  import rx.functions.Func1;
26  import rx.internal.util.*;
27  
28  /**
29   * Flattens a list of {@link Observable}s into one {@code Observable}, without any transformation.
30   * <p>
31   * <img width="640" height="380" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.png" alt="">
32   * <p>
33   * You can combine the items emitted by multiple {@code Observable}s so that they act like a single {@code Observable}, by using the merge operation.
34   * <p>
35   * The {@code instance(true)} call behaves like {@link OperatorMerge} except that if any of the merged Observables notify of
36   * an error via {@code onError}, {@code mergeDelayError} will refrain from propagating that error
37   * notification until all of the merged Observables have finished emitting items.
38   * <p>
39   * <img width="640" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/mergeDelayError.png" alt="">
40   * <p>
41   * Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will
42   * only invoke the {@code onError} method of its Observers once.
43   * <p>
44   * This operation allows an Observer to receive all successfully emitted items from all of the
45   * source Observables without being interrupted by an error notification from one of them.
46   * <p>
47   * <em>Note:</em> If this is used on an Observable that never completes, it will never call {@code onError} and will effectively swallow errors.
48  
49   * @param <T>
50   *            the type of the items emitted by both the source and merged {@code Observable}s
51   */
52  public class OperatorMerge<T> implements Operator<T, Observable<? extends T>> {
53      /** Lazy initialization via inner-class holder. */
54      private static final class HolderNoDelay {
55          /** A singleton instance. */
56          static final OperatorMerge<Object> INSTANCE = new OperatorMerge<Object>(false);
57      }
58      /** Lazy initialization via inner-class holder. */
59      private static final class HolderDelayErrors {
60          /** A singleton instance. */
61          static final OperatorMerge<Object> INSTANCE = new OperatorMerge<Object>(true);
62      }
63      /**
64       * @param delayErrors should the merge delay errors?
65       * @return a singleton instance of this stateless operator.
66       */
67      @SuppressWarnings("unchecked")
68      public static <T> OperatorMerge<T> instance(boolean delayErrors) {
69          if (delayErrors) {
70              return (OperatorMerge<T>)HolderDelayErrors.INSTANCE;
71          }
72          return (OperatorMerge<T>)HolderNoDelay.INSTANCE;
73      }
74      /*
75       * benjchristensen => This class is complex and I'm not a fan of it despite writing it. I want to give some background
76       * as to why for anyone who wants to try and help improve it.
77       * 
78       * One of my first implementations that added backpressure support (Producer.request) was fairly elegant and used a simple
79       * queue draining approach. It was simple to understand as all onNext were added to their queues, then a single winner
80       * would drain the queues, similar to observeOn. It killed the Netflix API when I canaried it. There were two problems:
81       * (1) performance and (2) object allocation overhead causing massive GC pressure. Remember that merge is one of the most
82       * used operators (mostly due to flatmap) and is therefore critical to and a limiter of performance in any application.
83       * 
84       * All subsequent work on this class and the various fast-paths and branches within it have been to achieve the needed functionality
85       * while reducing or eliminating object allocation and keeping performance acceptable.
86       * 
87       * This has meant adopting strategies such as:
88       * 
89       * - ring buffers instead of growable queues
90       * - object pooling
91       * - skipping request logic when downstream does not need backpressure
92       * - ScalarValueQueue for optimizing synchronous single-value Observables
93       * - adopting data structures that use Unsafe (and gating them based on environment so non-Oracle JVMs still work)
94       * 
95       * It has definitely increased the complexity and maintenance cost of this class, but the performance gains have been significant.
96       * 
97       * The biggest cost of the increased complexity is concurrency bugs and reasoning through what's going on.
98       * 
99       * I'd love to have contributions that improve this class, but keep in mind the performance and GC pressure.
100      * The benchmarks I use are in the JMH OperatorMergePerf class. GC memory pressure is tested using Java Flight Recorder
101      * to track object allocation.
102      */
103 
104     private OperatorMerge() {
105         this.delayErrors = false;
106     }
107 
108     private OperatorMerge(boolean delayErrors) {
109         this.delayErrors = delayErrors;
110     }
111 
112     private final boolean delayErrors;
113 
114     @Override
115     public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
116         return new MergeSubscriber<T>(child, delayErrors);
117 
118     }
119 
120     private static final class MergeSubscriber<T> extends Subscriber<Observable<? extends T>> {
121         final NotificationLite<T> on = NotificationLite.instance();
122         final Subscriber<? super T> actual;
123         private final MergeProducer<T> mergeProducer;
124         private int wip;
125         private boolean completed;
126         private final boolean delayErrors;
127         private ConcurrentLinkedQueue<Throwable> exceptions;
128 
129         private volatile SubscriptionIndexedRingBuffer<InnerSubscriber<T>> childrenSubscribers;
130 
131         private volatile RxRingBuffer scalarValueQueue = null;
132 
133         /* protected by lock on MergeSubscriber instance */
134         private int missedEmitting = 0;
135         private boolean emitLock = false;
136 
137         /**
138          * Using synchronized(this) for `emitLock` instead of ReentrantLock or AtomicInteger is faster when there is no contention.
139          * 
140          * <pre> {@code
141          * Using ReentrantLock:
142          * r.o.OperatorMergePerf.merge1SyncStreamOfN           1000  thrpt         5    44185.294     1295.565    ops/s
143          * 
144          * Using synchronized(this):
145          * r.o.OperatorMergePerf.merge1SyncStreamOfN           1000  thrpt         5    79715.981     3704.486    ops/s
146          * 
147          * Still slower though than allowing concurrency:
148          * r.o.OperatorMergePerf.merge1SyncStreamOfN           1000  thrpt         5   149331.046     4851.290    ops/s
149          * } </pre>
150          */
151 
152         public MergeSubscriber(Subscriber<? super T> actual, boolean delayErrors) {
153             super(actual);
154             this.actual = actual;
155             this.mergeProducer = new MergeProducer<T>(this);
156             this.delayErrors = delayErrors;
157             // decoupled the subscription chain because we need to decouple and control backpressure
158             actual.add(this);
159             actual.setProducer(mergeProducer);
160         }
161 
162         @Override
163         public void onStart() {
164             // we request backpressure so we can handle long-running Observables that are enqueueing, such as flatMap use cases
165             // we decouple the Producer chain while keeping the Subscription chain together (perf benefit) via super(actual)
166             request(RxRingBuffer.SIZE);
167         }
168 
169         /*
170          * This is expected to be executed sequentially as per the Rx contract or it will not work.
171          */
172         @Override
173         public void onNext(Observable<? extends T> t) {
174             if (t instanceof ScalarSynchronousObservable) {
175                 ScalarSynchronousObservable<? extends T> t2 = (ScalarSynchronousObservable<? extends T>)t;
176                 handleScalarSynchronousObservable(t2);
177             } else {
178                 if (t == null || isUnsubscribed()) {
179                     return;
180                 }
181                 synchronized (this) {
182                     // synchronized here because `wip` can be concurrently changed by children Observables
183                     wip++;
184                 }
185                 handleNewSource(t);
186             }
187         }
188 
189         private void handleNewSource(Observable<? extends T> t) {
190             if (childrenSubscribers == null) {
191                 // lazily create this only if we receive Observables we need to subscribe to
192                 childrenSubscribers = new SubscriptionIndexedRingBuffer<InnerSubscriber<T>>();
193                 add(childrenSubscribers);
194             }
195             MergeProducer<T> producerIfNeeded = null;
196             // if we have received a request then we need to respect it, otherwise we fast-path
197             if (mergeProducer.requested != Long.MAX_VALUE) {
198                 /**
199                  * <pre> {@code
200                  * With this optimization:
201                  * 
202                  * r.o.OperatorMergePerf.merge1SyncStreamOfN      1000  thrpt         5    57100.080     4686.331    ops/s
203                  * r.o.OperatorMergePerf.merge1SyncStreamOfN   1000000  thrpt         5       60.875        1.622    ops/s
204                  *  
205                  * Without this optimization:
206                  * 
207                  * r.o.OperatorMergePerf.merge1SyncStreamOfN      1000  thrpt         5    29863.945     1858.002    ops/s
208                  * r.o.OperatorMergePerf.merge1SyncStreamOfN   1000000  thrpt         5       30.516        1.087    ops/s
209                  * } </pre>
210                  */
211                 producerIfNeeded = mergeProducer;
212             }
213             InnerSubscriber<T> i = new InnerSubscriber<T>(this, producerIfNeeded);
214             i.sindex = childrenSubscribers.add(i);
215             t.unsafeSubscribe(i);
216             if (!isUnsubscribed()) {
217                 request(1);
218             }
219         }
220 
221         private void handleScalarSynchronousObservable(ScalarSynchronousObservable<? extends T> t) {
222             // fast-path for scalar, synchronous values such as Observable.from(int)
223             /**
224              * Without this optimization:
225              * 
226              * <pre> {@code
227              * Benchmark                                          (size)   Mode   Samples        Score  Score error    Units
228              * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  2,418,452.409   130572.665    ops/s
229              * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5     5,690.456       94.958    ops/s
230              * r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5          takes too long
231              * 
232              * With this optimization:
233              * 
234              * r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  5,475,300.198   156741.334    ops/s
235              * r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    68,932.278     1311.023    ops/s
236              * r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       64.405        0.611    ops/s
237              * } </pre>
238              * 
239              */
240             if (mergeProducer.requested == Long.MAX_VALUE) {
241                 handleScalarSynchronousObservableWithoutRequestLimits(t);
242             } else {
243                 handleScalarSynchronousObservableWithRequestLimits(t);
244             }
245         }
246 
247         private void handleScalarSynchronousObservableWithoutRequestLimits(ScalarSynchronousObservable<? extends T> t) {
248             T value = t.get();
249             if (getEmitLock()) {
250                 boolean moreToDrain;
251                 try {
252                     actual.onNext(value);
253                 } finally {
254                     moreToDrain = releaseEmitLock();
255                 }
256                 if (moreToDrain) {
257                     drainQueuesIfNeeded();
258                 }
259                 request(1);
260                 return;
261             } else {
262                 try {
263                     getOrCreateScalarValueQueue().onNext(value);
264                 } catch (MissingBackpressureException e) {
265                     onError(e);
266                 }
267                 return;
268             }
269         }
270 
271         private void handleScalarSynchronousObservableWithRequestLimits(ScalarSynchronousObservable<? extends T> t) {
272             if (getEmitLock()) {
273                 boolean emitted = false;
274                 boolean moreToDrain;
275                 boolean isReturn = false;
276                 try {
277                     long r = mergeProducer.requested;
278                     if (r > 0) {
279                         emitted = true;
280                         actual.onNext(t.get());
281                         MergeProducer.REQUESTED.decrementAndGet(mergeProducer);
282                         // we handle this Observable without ever incrementing the wip or touching other machinery so just return here
283                         isReturn = true;
284                     }
285                 } finally {
286                     moreToDrain = releaseEmitLock();
287                 }
288                 if (moreToDrain) {
289                     drainQueuesIfNeeded();
290                 }
291                 if (emitted) {
292                     request(1);
293                 }
294                 if (isReturn) {
295                     return;
296                 }
297             }
298 
299             // if we didn't return above we need to enqueue
300             // enqueue the values for later delivery
301             try {
302                 getOrCreateScalarValueQueue().onNext(t.get());
303             } catch (MissingBackpressureException e) {
304                 onError(e);
305             }
306         }
307 
308         private RxRingBuffer getOrCreateScalarValueQueue() {
309             RxRingBuffer svq = scalarValueQueue;
310             if (svq == null) {
311                 svq = RxRingBuffer.getSpscInstance();
312                 scalarValueQueue = svq;
313             }
314             return svq;
315         }
316 
317         private synchronized boolean releaseEmitLock() {
318             emitLock = false;
319             if (missedEmitting == 0) {
320                 return false;
321             } else {
322                 return true;
323             }
324         }
325 
326         private synchronized boolean getEmitLock() {
327             if (emitLock) {
328                 missedEmitting++;
329                 return false;
330             } else {
331                 emitLock = true;
332                 missedEmitting = 0;
333                 return true;
334             }
335         }
336 
337         private boolean drainQueuesIfNeeded() {
338             while (true) {
339                 if (getEmitLock()) {
340                     int emitted = 0;
341                     boolean moreToDrain;
342                     try {
343                         emitted = drainScalarValueQueue();
344                         drainChildrenQueues();
345                     } finally {
346                         moreToDrain = releaseEmitLock();
347                     }
348                     // request outside of lock
349                     if (emitted > 0) {
350                         request(emitted);
351                     }
352                     if (!moreToDrain) {
353                         return true;
354                     }
355                     // otherwise we'll loop and get whatever was added
356                 } else {
357                     return false;
358                 }
359             }
360         }
361 
362         int lastDrainedIndex = 0;
363 
364         /**
365          * ONLY call when holding the EmitLock.
366          */
367         private void drainChildrenQueues() {
368             if (childrenSubscribers != null) {
369                 lastDrainedIndex = childrenSubscribers.forEach(DRAIN_ACTION, lastDrainedIndex);
370             }
371         }
372 
373         /**
374          * ONLY call when holding the EmitLock.
375          */
376         private int drainScalarValueQueue() {
377             RxRingBuffer svq = scalarValueQueue;
378             if (svq != null) {
379                 long r = mergeProducer.requested;
380                 int emittedWhileDraining = 0;
381                 if (r < 0) {
382                     // drain it all
383                     Object o = null;
384                     while ((o = svq.poll()) != null) {
385                         on.accept(actual, o);
386                         emittedWhileDraining++;
387                     }
388                 } else if (r > 0) {
389                     // drain what was requested
390                     long toEmit = r;
391                     for (int i = 0; i < toEmit; i++) {
392                         Object o = svq.poll();
393                         if (o == null) {
394                             break;
395                         } else {
396                             on.accept(actual, o);
397                             emittedWhileDraining++;
398                         }
399                     }
400                     // decrement the number we emitted from outstanding requests
401                     MergeProducer.REQUESTED.getAndAdd(mergeProducer, -emittedWhileDraining);
402                 }
403                 return emittedWhileDraining;
404             }
405             return 0;
406         }
407 
408         final Func1<InnerSubscriber<T>, Boolean> DRAIN_ACTION = new Func1<InnerSubscriber<T>, Boolean>() {
409 
410             @Override
411             public Boolean call(InnerSubscriber<T> s) {
412                 if (s.q != null) {
413                     long r = mergeProducer.requested;
414                     int emitted = s.drainQueue();
415                     if (emitted > 0) {
416                         s.requestMore(emitted);
417                     }
418                     if (emitted == r) {
419                         // we emitted as many as were requested so stop the forEach loop
420                         return Boolean.FALSE;
421                     }
422                 }
423                 return Boolean.TRUE;
424             }
425 
426         };
427 
428         @Override
429         public void onError(Throwable e) {
430             if (!completed) {
431                 completed = true;
432                 innerError(e, true);
433             }
434         }
435         
436         private void innerError(Throwable e, boolean parent) {
437             if (delayErrors) {
438                 synchronized (this) {
439                     if (exceptions == null) {
440                         exceptions = new ConcurrentLinkedQueue<Throwable>();
441                     }
442                 }
443                 exceptions.add(e);
444                 boolean sendOnComplete = false;
445                 synchronized (this) {
446                     if (!parent) {
447                         wip--;
448                     }
449                     if ((wip == 0 && completed) || (wip < 0)) {
450                         sendOnComplete = true;
451                     }
452                 }
453                 if (sendOnComplete) {
454                     drainAndComplete();
455                 }
456             } else {
457                 actual.onError(e);
458             }
459         }
460 
461         @Override
462         public void onCompleted() {
463             boolean c = false;
464             synchronized (this) {
465                 completed = true;
466                 if (wip == 0) {
467                     c = true;
468                 }
469             }
470             if (c) {
471                 // complete outside of lock
472                 drainAndComplete();
473             }
474         }
475 
476         void completeInner(InnerSubscriber<T> s) {
477             boolean sendOnComplete = false;
478             synchronized (this) {
479                 wip--;
480                 if (wip == 0 && completed) {
481                     sendOnComplete = true;
482                 }
483             }
484             childrenSubscribers.remove(s.sindex);
485             if (sendOnComplete) {
486                 drainAndComplete();
487             }
488         }
489 
490         private void drainAndComplete() {
491             boolean moreToDrain = true;
492             while (moreToDrain) {
493                 synchronized (this) {
494                     missedEmitting = 0;
495                 }
496                 drainScalarValueQueue();
497                 drainChildrenQueues();
498                 synchronized (this) {
499                     moreToDrain = missedEmitting > 0;
500                 }
501             }
502             RxRingBuffer svq = scalarValueQueue;
503             if (svq == null || svq.isEmpty()) {
504                 if (delayErrors) {
505                     Queue<Throwable> es = null;
506                     synchronized (this) {
507                         es = exceptions;
508                     }
509                     if (es != null) {
510                         if (es.isEmpty()) {
511                             actual.onCompleted();
512                         } else if (es.size() == 1) {
513                             actual.onError(es.poll());
514                         } else {
515                             actual.onError(new CompositeException(es));
516                         }
517                     } else {
518                         actual.onCompleted();
519                     }
520                 } else {
521                     actual.onCompleted();
522                 }
523             }
524         }
525 
526     }
527 
528     private static final class MergeProducer<T> implements Producer {
529 
530         private final MergeSubscriber<T> ms;
531 
532         public MergeProducer(MergeSubscriber<T> ms) {
533             this.ms = ms;
534         }
535 
536         private volatile long requested = 0;
537         @SuppressWarnings("rawtypes")
538         static final AtomicLongFieldUpdater<MergeProducer> REQUESTED = AtomicLongFieldUpdater.newUpdater(MergeProducer.class, "requested");
539 
540         @Override
541         public void request(long n) {
542             if (requested == Long.MAX_VALUE) {
543                 return;
544             }
545             if (n == Long.MAX_VALUE) {
546                 requested = Long.MAX_VALUE;
547             } else {
548                 BackpressureUtils.getAndAddRequest(REQUESTED, this, n);
549                 if (ms.drainQueuesIfNeeded()) {
550                     boolean sendComplete = false;
551                     synchronized (ms) {
552                         if (ms.wip == 0 && ms.scalarValueQueue != null && ms.scalarValueQueue.isEmpty()) {
553                             sendComplete = true;
554                         }
555                     }
556                     if (sendComplete) {
557                         ms.drainAndComplete();
558                     }
559                 }
560             }
561         }
562 
563     }
564 
565     private static final class InnerSubscriber<T> extends Subscriber<T> {
566         public int sindex;
567         final MergeSubscriber<T> parentSubscriber;
568         final MergeProducer<T> producer;
569         /** Make sure the inner termination events are delivered only once. */
570         @SuppressWarnings("unused")
571         volatile int terminated;
572         @SuppressWarnings("rawtypes")
573         static final AtomicIntegerFieldUpdater<InnerSubscriber> ONCE_TERMINATED = AtomicIntegerFieldUpdater.newUpdater(InnerSubscriber.class, "terminated");
574 
575         private final RxRingBuffer q = RxRingBuffer.getSpscInstance();
576 
577         public InnerSubscriber(MergeSubscriber<T> parent, MergeProducer<T> producer) {
578             this.parentSubscriber = parent;
579             this.producer = producer;
580             add(q);
581             request(q.capacity());
582         }
583 
584         @Override
585         public void onNext(T t) {
586             emit(t, false);
587         }
588 
589         @Override
590         public void onError(Throwable e) {
591             // it doesn't go through queues, it immediately onErrors and tears everything down
592             if (ONCE_TERMINATED.compareAndSet(this, 0, 1)) {
593                 parentSubscriber.innerError(e, false);
594             }
595         }
596 
597         @Override
598         public void onCompleted() {
599             if (ONCE_TERMINATED.compareAndSet(this, 0, 1)) {
600                 emit(null, true);
601             }
602         }
603 
604         public void requestMore(long n) {
605             request(n);
606         }
607 
608         private void emit(T t, boolean complete) {
609             boolean drain = false;
610             boolean enqueue = true;
611             /**
612              * This optimization to skip the queue is messy ... but it makes a big difference in performance when merging a single stream
613              * with many values, or many intermittent streams without contention. It doesn't make much of a difference if there is contention.
614              * 
615              * Below are some of the relevant benchmarks to show the difference.
616              * 
617              * <pre> {@code
618              * With this fast-path:
619              * 
620              * Benchmark                                          (size)   Mode   Samples        Score  Score error    Units
621              * r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  5344143.680   393484.592    ops/s
622              * r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    83582.662     4293.755    ops/s +++
623              * r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       73.889        4.477    ops/s +++
624              * 
625              * r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5799265.333   199205.296    ops/s +
626              * r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       62.655        2.521    ops/s +++
627              * 
628              * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    76925.616     4909.174    ops/s
629              * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     3634.977      242.469    ops/s
630              * 
631              * Without:
632              * 
633              * Benchmark                                          (size)   Mode   Samples        Score  Score error    Units
634              * r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  5099295.678   159539.842    ops/s
635              * r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    18196.671    10053.298    ops/s
636              * r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       19.184        1.028    ops/s
637              * 
638              * r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5591612.719   591821.763    ops/s
639              * r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       21.018        3.251    ops/s
640              * 
641              * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    72692.073    18395.031    ops/s
642              * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     4379.093      386.368    ops/s
643              * } </pre>
644              * 
645              * It looks like it may cause a slowdown in highly contended cases (like 'mergeTwoAsyncStreamsOfN' above) as instead of just
646              * putting in the queue, it attempts to get the lock. We are optimizing for the non-contended case.
647              */
648             if (parentSubscriber.getEmitLock()) {
649                 long emitted = 0;
650                 enqueue = false;
651                 try {
652                     // drain the queue if there is anything in it before emitting the current value
653                     emitted += drainQueue();
654                     //                    }
655                     if (producer == null) {
656                         // no backpressure requested
657                         if (complete) {
658                             parentSubscriber.completeInner(this);
659                         } else {
660                             try {
661                                 parentSubscriber.actual.onNext(t);
662                             } catch (Throwable e) {
663                                 // special error handling due to complexity of merge
664                                 onError(OnErrorThrowable.addValueAsLastCause(e, t));
665                             }
666                             emitted++;
667                         }
668                     } else {
669                         // this needs to check q.count() as draining above may not have drained the full queue
670                         // perf tests show this to be okay, though different queue implementations could perform poorly with this
671                         if (producer.requested > 0 && q.count() == 0) {
672                             if (complete) {
673                                 parentSubscriber.completeInner(this);
674                             } else {
675                                 try {
676                                     parentSubscriber.actual.onNext(t);
677                                 } catch (Throwable e) {
678                                     // special error handling due to complexity of merge
679                                     onError(OnErrorThrowable.addValueAsLastCause(e, t));
680                                 }
681                                 emitted++;
682                                 MergeProducer.REQUESTED.decrementAndGet(producer);
683                             }
684                         } else {
685                             // no requests available, so enqueue it
686                             enqueue = true;
687                         }
688                     }
689                 } finally {
690                     drain = parentSubscriber.releaseEmitLock();
691                 }
692                 // request upstream what we just emitted
693                 if(emitted > 0) {
694                     request(emitted);
695                 }
696             }
697             if (enqueue) {
698                 enqueue(t, complete);
699                 drain = true;
700             }
701             if (drain) {
702                 /**
703                  * This extra check for whether to call drain is ugly, but it helps:
704                  * <pre> {@code
705                  * Without:
706                  * r.o.OperatorMergePerf.mergeNSyncStreamsOfN     1000  thrpt         5       61.812        1.455    ops/s
707                  * 
708                  * With:
709                  * r.o.OperatorMergePerf.mergeNSyncStreamsOfN     1000  thrpt         5       78.795        1.766    ops/s
710                  * } </pre>
711                  */
712                 parentSubscriber.drainQueuesIfNeeded();
713             }
714         }
715 
716         private void enqueue(T t, boolean complete) {
717             try {
718                 if (complete) {
719                     q.onCompleted();
720                 } else {
721                     q.onNext(t);
722                 }
723             } catch (MissingBackpressureException e) {
724                 onError(e);
725             }
726         }
727 
728         private int drainRequested() {
729             int emitted = 0;
730             // drain what was requested
731             long toEmit = producer.requested;
732             Object o;
733             for (int i = 0; i < toEmit; i++) {
734                 o = q.poll();
735                 if (o == null) {
736                     // no more items
737                     break;
738                 } else if (q.isCompleted(o)) {
739                     parentSubscriber.completeInner(this);
740                 } else {
741                     try {
742                         if (!q.accept(o, parentSubscriber.actual)) {
743                             emitted++;
744                         }
745                     } catch (Throwable e) {
746                         // special error handling due to complexity of merge
747                         onError(OnErrorThrowable.addValueAsLastCause(e, o));
748                     }
749                 }
750             }
751 
752             // decrement the number we emitted from outstanding requests
753             MergeProducer.REQUESTED.getAndAdd(producer, -emitted);
754             return emitted;
755         }
756 
757         private int drainAll() {
758             int emitted = 0;
759             // drain it all
760             Object o;
761             while ((o = q.poll()) != null) {
762                 if (q.isCompleted(o)) {
763                     parentSubscriber.completeInner(this);
764                 } else {
765                     try {
766                         if (!q.accept(o, parentSubscriber.actual)) {
767                             emitted++;
768                         }
769                     } catch (Throwable e) {
770                         // special error handling due to complexity of merge
771                         onError(OnErrorThrowable.addValueAsLastCause(e, o));
772                     }
773                 }
774             }
775             return emitted;
776         }
777 
778         private int drainQueue() {
779             if (producer != null) {
780                 return drainRequested();
781             } else {
782                 return drainAll();
783             }
784         }
785     }
786 }